package me.seawenc.db.checker.dbengine.impl.engines;

import com.alibaba.fastjson.JSONObject;
import me.seawenc.db.checker.dbengine.DsType;
import me.seawenc.db.checker.bean.FileConfig;
import me.seawenc.db.checker.dbengine.impl.engines.bean.TableColumnBean;
import me.seawenc.db.checker.dbengine.impl.engines.bean.TableDetailBean;
import me.seawenc.db.checker.dbengine.impl.engines.bean.TableNameBean;
import me.seawenc.db.checker.dbengine.impl.utils.SqlActuator;
import me.seawenc.db.checker.helper.Log;
import me.seawenc.db.checker.helper.Optionalx;
import org.apache.commons.lang.StringEscapeUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static me.seawenc.db.checker.dbengine.impl.engines.constant.ConstantFieldName.TAB_NAME;

/**
 * 数据库执行引擎-hive实现
 * Servicename的格式为：dbengine_{com.dgp.common.enums.DsType}
 */
public class EngineServiceHive extends EngineServiceBase {

    public EngineServiceHive(FileConfig conf) throws Exception {
        super(conf);
    }

    @Override
    protected DsType getDsType() {
        return DsType.HIVE;
    }

    @Override
    public List<TableNameBean> findAllTables() throws Exception {

        List<JSONObject> jsonObjects = sqlActuator.execQuerySql(getAllTablesSql(DsType.HIVE.lname()));
        List<TableNameBean> tableNames = jsonObjects.stream()
                //每个表注释需要单独查询
                .map(v -> new TableNameBean(getTableName(v), findTableNotes(sqlActuator, v)))
                .collect(Collectors.toList());
        sqlActuator.close();
        return tableNames;
    }

    private String getTableName(JSONObject obj) {
        // 当引擎类型为hive时,列名为tab_name,为spark时，列名为：tableName
        String name= Optionalx.ofByDefGet(obj.getString(TAB_NAME.toString()),obj.getString("tableName"));
        Log.info("fixUnicode before:%s, after:%s",name,StringEscapeUtils.unescapeJava(name));
        return StringEscapeUtils.unescapeJava(name);
    }


    @Override
    protected TableDetailBean execFindTableColumns(String tableName,String extType) throws Exception {
        String sql = getTableDescribeSql(tableName,extType);
        List<JSONObject> result = sqlActuator.execQuerySql(sql);

        TableDetailBean tableInfo = new TableDetailBean(tableName);
        if(Optionalx.isNotPresent(result)){
            return tableInfo;
        }
        //解决重复列问题
        Map<String,TableColumnBean> cols=new HashMap<>();
        int order=0;
        for(JSONObject col:result){
            String colName=col.getString("col_name");
            if(Optionalx.isNotPresent(colName) || colName.contains("#")||colName.contains(":")){
                //过滤掉无效的结果
                continue;
            }
            order++;
            TableColumnBean tableColumn = new TableColumnBean();
            tableColumn.setColumnName(col.getString("col_name"));
            tableColumn.setDataType(col.getString("data_type"));
//            tableColumn.setComment(col.getString("comment"));
//            tableColumn.setAttnum(order+"");
            tableInfo.addCol(tableColumn);
            cols.put(tableColumn.getColumnName(),tableColumn);
        }
        tableInfo.setTableColumns(cols.values().stream().collect(Collectors.toList()));
        return tableInfo;
    }

    /**
     * 由于hive的表注释在表结构里面，因此所有表都要单独查询一次
     * @param actuator
     * @return
     */
    private String findTableNotes(SqlActuator actuator, JSONObject table) {
        String tableName=getTableName(table);
        try {
            List<JSONObject> ret = actuator.execQuerySql("DESCRIBE FORMATTED " + tableName);
            for(JSONObject col:ret){
                if("comment".equals(col.getString("data_type").trim())){
                    return (StringEscapeUtils.unescapeJava(col.getString("comment"))+"").trim();
                }
            }
        } catch (Exception throwables) {
            Log.warn("查询表注释出错:"+tableName,throwables);
        }
        return tableName;
    }
}
